/**
 *  Copyright 2007-2008 University Of Southern California
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *  http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing,
 *  software distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */

package edu.isi.pegasus.planner.parser.pdax;


import edu.isi.pegasus.common.logging.LogManagerFactory;
import edu.isi.pegasus.planner.code.CodeGeneratorFactory;

import edu.isi.pegasus.planner.code.GridStartFactory;

import edu.isi.pegasus.planner.classes.ADag;
import edu.isi.pegasus.planner.classes.PlannerOptions;
import edu.isi.pegasus.planner.classes.Job;

import edu.isi.pegasus.common.logging.LogManager;

import edu.isi.pegasus.common.util.CondorVersion;

import edu.isi.pegasus.planner.common.PegasusProperties;
import edu.isi.pegasus.common.util.StreamGobbler;
import edu.isi.pegasus.common.util.DefaultStreamGobblerCallback;
import edu.isi.pegasus.common.util.StreamGobblerCallback;

import edu.isi.pegasus.planner.namespace.Pegasus;

import edu.isi.pegasus.planner.partitioner.Partition;
import edu.isi.pegasus.planner.partitioner.DAXWriter;

import edu.isi.pegasus.planner.catalog.TransformationCatalog;
import edu.isi.pegasus.planner.catalog.transformation.TransformationCatalogEntry;

import edu.isi.pegasus.planner.catalog.transformation.TransformationFactory;

import org.griphyn.vdl.euryale.FileFactory;
import org.griphyn.vdl.euryale.HashedFileFactory;
import org.griphyn.vdl.euryale.FlatFileFactory;

import edu.isi.pegasus.planner.catalog.transformation.classes.TCType;


import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.PrintWriter;
import java.io.BufferedWriter;
import java.io.FilenameFilter;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import java.util.regex.Pattern;

import java.text.NumberFormat;
import java.text.DecimalFormat;
import edu.isi.pegasus.planner.namespace.Condor;
import edu.isi.pegasus.planner.namespace.ENV;


/**
 * This callback ends up creating the megadag that contains the smaller dags
 * each corresponding to the one level as identified in the pdax file
 * generated by the partitioner.
 *
 * @author Karan Vahi
 * @version $Revision: 4507 $
 */
public class PDAX2MDAG implements Callback {



    /**
     * The SubmitWriter that has to be loaded for now.
     */
    public static final String CODE_GENERATOR_CLASS =
                                CodeGeneratorFactory.CONDOR_CODE_GENERATOR_CLASS;

    /**
     * The prefix for the submit directory.
     */
    public static final String SUBMIT_DIRECTORY_PREFIX = "run";
    
    
    

    /**
     * The number of jobs into which each job in the partition graph is
     * expanded to.
     */
    public static final int NUM_OF_EXPANDED_JOBS = 2;

    /**
     * The index of the head job.
     */
    public static final int HEAD_INDEX = 0;

    /**
     * The index of the tail job.
     */
    public static final int TAIL_INDEX = 1;

    /**
     * The logical name with which to query the transformation catalog for
     * cPlanner executable.
     */
    public static final String CPLANNER_LOGICAL_NAME = "pegasus-plan";

    /**
     * The namespace to use for condor dagman.
     */
    public static final String CONDOR_DAGMAN_NAMESPACE = "condor";

    /**
     * The logical name with which to query the transformation catalog for the
     * condor_dagman executable, that ends up running the mini dag as one
     * job.
     */
    public static final String CONDOR_DAGMAN_LOGICAL_NAME = "dagman";

    /**
     * The namespace to which the job in the MEGA DAG being created refer to.
     */
    public static final String NAMESPACE = "pegasus";

    /**
     * The planner utility that needs to be called as a prescript.
     */
    public static final String RETRY_LOGICAL_NAME = "pegasus-plan";

    /**
     * The dagman knobs controlled through property. They map the property name to
     * the corresponding dagman option.
     */
    public static final String DAGMAN_KNOBS[][]={
        { "pegasus.dagman.maxpre",  " -MaxPre " },
        { "pegasus.dagman.maxpost", " -MaxPost " },
        { "pegasus.dagman.maxjobs", " -MaxJobs " },
        { "pegasus.dagman.maxidle", " -MaxIdle " },
    };

    /**
     * The file Separator to be used on the submit host.
     */
    protected static char mSeparator = File.separatorChar;

    /**
     * The directory in which the daxes corresponding to the partitions are
     * kept. This should be the same directory where the pdax containing the
     * partition graph resides.
     */
    private String mPDAXDirectory;

    /**
     * The root of the submit directory where all the submit directories for
     * the various partitions reside.
     */
    private String mSubmitDirectory;

    /**
     * The abstract dag object that ends up holding the megadag.
     */
    private ADag mMegaDAG;

    /**
     * The internal map that maps the partition id to the job responsible
     * for executing the partition..
     */
    private Map mJobMap;

    /**
     * The internal map that contains maps the job id of the partition to the
     * head and tail jobs in the linear sequence of jobs to which the partion
     * job is expanded to.
     */
    //private Map mSequenceMap;

    /**
     * The handle to the properties file.
     */
    private PegasusProperties mProps;

    /**
     * The handle to the transformation catalog.
     */
    private TransformationCatalog mTCHandle;

    /**
     * The handle to the logging object.
     */
    private LogManager mLogger;

    /**
     * The object containing the options that were given to the concrete
     * planner at runtime.
     */
    private PlannerOptions mPOptions;


    /**
     * The path to the properties file that is written out and shared by
     * all partitions in the mega DAG.
     */
    private String mMDAGPropertiesFile;

    /**
     * The handle to the file factory, that is  used to create the top level
     * directories for each of the partitions.
     */
    private FileFactory mFactory;

    /**
     * An instance of the default stream gobbler callback implementation that
     * is used for creating symbolic links.
     */
    private StreamGobblerCallback mDefaultCallback;

    /**
     * The number formatter to format the run submit dir entries.
     */
    private NumberFormat mNumFormatter;

    /**
     * The user name of the user running Pegasus.
     */
    private String mUser;

    /**
     * A flag to store whether the parsing is complete or not.
     */
    private boolean mDone;

    /**
     * Any extra arguments that need to be passed to dagman, as determined
     * from the properties file.
     */
    private String mDAGManKnobs;

    /**
     * The long value of condor version.
     */
    private long mCondorVersion;
    
    /**
     * The cleanup scope for the workflows.
     */
    private PegasusProperties.CLEANUP_SCOPE mCleanupScope;
    
    /**
     * Bag of initialization objects.
     */
    //private PegasusBag mBag;

    /**
     * The overloaded constructor.
     *
     * @param directory the directory where the pdax and all the daxes
     *                  corresponding to the partitions reside.
     * @param properties the <code>PegasusProperties</code> to be used.
     * @param options    the options passed to the planner.
     */
    public PDAX2MDAG( String directory, PegasusProperties properties, PlannerOptions options) {
        mPDAXDirectory = directory;
        mProps     = properties;
        mLogger    =  LogManagerFactory.loadSingletonInstance( properties );
        mPOptions  = options;
        
        mTCHandle = TransformationFactory.loadInstance( mProps, mLogger );
        
        mMDAGPropertiesFile = null;
        mNumFormatter = new DecimalFormat( "0000" );

        mDone = false;
        mUser = mProps.getProperty( "user.name" ) ;
        if ( mUser == null ){ mUser = "user"; }

        //the default gobbler callback always log to debug level
        mDefaultCallback =
               new DefaultStreamGobblerCallback(LogManager.DEBUG_MESSAGE_LEVEL);

        mDAGManKnobs = constructDAGManKnobs( properties );
        mCleanupScope = mProps.getCleanupScope();
        
        mCondorVersion = CondorVersion.getInstance( mLogger ).numericValue();
        if( mCondorVersion == -1 ){
            mLogger.log( "Unable to determine the version of condor " , LogManager.WARNING_MESSAGE_LEVEL );
        }
        else{
            mLogger.log( "Condor Version detected is " + mCondorVersion , LogManager.DEBUG_MESSAGE_LEVEL );
        }
        
    }


    /**
     * Checks the destination location for existence, if it can
     * be created, if it is writable etc.
     *
     * @param dir is the new base directory to optionally create.
     *
     * @throws IOException in case of error while writing out files.
     */
    protected static void sanityCheck( File dir ) throws IOException{
        if ( dir.exists() ) {
            // location exists
            if ( dir.isDirectory() ) {
                // ok, isa directory
                if ( dir.canWrite() ) {
                    // can write, all is well
                    return;
                } else {
                    // all is there, but I cannot write to dir
                    throw new IOException( "Cannot write to existing directory " +
                                           dir.getPath() );
                }
            } else {
                // exists but not a directory
                throw new IOException( "Destination " + dir.getPath() + " already " +
                                       "exists, but is not a directory." );
            }
        } else {
            // does not exist, try to make it
            if ( ! dir.mkdirs() ) {
                throw new IOException( "Unable to create directory destination " +
                                       dir.getPath() );
            }
        }
    }



    /**
     * Callback when the opening tag was parsed. This contains all
     * attributes and their raw values within a map. This callback can
     * also be used to initialize callback-specific resources.
     *
     * @param attributes is a map of attribute key to attribute value
     */
    public void cbDocument(Map attributes) {
        mMegaDAG     = new ADag();
        mJobMap      = new HashMap();
        //mSequenceMap = new HashMap();

        //the name of the mega dag is set to the name
        //attribute in the pdax
        mMegaDAG.dagInfo.nameOfADag = (String)attributes.get("name");
        mMegaDAG.dagInfo.count      = (String)attributes.get("count");
        mMegaDAG.dagInfo.index      = (String)attributes.get("index");

        // create files in the directory, unless anything else is known.
        try {
            //create a submit directory structure if required
            String relativeDir = ( mPOptions.getRelativeDirectory() == null ) ?
                                          this.createSubmitDirectory( mMegaDAG.getLabel(),
                                                                      mPOptions.getSubmitDirectory(),
                                                                      mUser,
                                                                      mPOptions.getVOGroup(),
                                                                      mProps.useTimestampForDirectoryStructure() ):
                                          mPOptions.getRelativeDirectory();

            //set the directory structure
            mPOptions.setSubmitDirectory( mPOptions.getBaseSubmitDirectory(), relativeDir);
            mSubmitDirectory = mPOptions.getSubmitDirectory();

            //we want to set the relative directory as the base working
            //directory for all the partition on the remote sites.
            mPOptions.setRandomDir( relativeDir );

            mFactory = new FlatFileFactory(mSubmitDirectory); // minimum default
        } catch ( IOException ioe ) {
            throw new RuntimeException( "Unable to generate files in the submit directory " ,
                                        ioe );
        }


        // not in the PDAX format currently
        String s = (String) attributes.get("partitionCount");

        // create hashed, and levelled directories
        try {
            HashedFileFactory temp = null;
            int partCount =  ( s == null ) ?
                            //determine at runtime the number of partitions
                            getPartitionCount(mPOptions.getPDAX()) :
                            Integer.parseInt(s) ;

            //if ( m_minlevel > 0 && m_minlevel > jobCount ) jobCount = m_minlevel;
            if ( partCount > 0 )temp = new HashedFileFactory( mSubmitDirectory, partCount );
            else temp = new HashedFileFactory( mPDAXDirectory );

            //each job creates at creates the following files
            //  - submit file
            //  - out file
            //  - error file
            //  - prescript log
            //  - the partition directory
            temp.setMultiplicator(5);

            //we want a minimum of one level always for clarity
            temp.setLevels(1);

            //for the time being and test set files per directory to 50
            /*
            temp.setFilesPerDirectory( 40 );
            temp.setLevelsFromTotals(partCount);
            */

            mFactory = temp;

            //write out all the properties into a temp file
            //in the root submit directory
            //mMDAGPropertiesFile = writeOutProperties( mSubmitDirectory );
            mMDAGPropertiesFile = mProps.writeOutProperties( mSubmitDirectory );

        }
        catch ( NumberFormatException nfe ) {
            String error =  ( s == null ) ?
                            "Unspecified number for jobCount":
                            "Illegal number \"" + s + "\" for partition count";
            throw new RuntimeException( error );
        }
        catch ( IOException e ) {
            //figure out where error happened
            String message = (mMDAGPropertiesFile == null)?
                             "Unable to write out properties file in base submit directory":
                             "Base directory creation";

            //wrap into runtime and throw
            throw new RuntimeException( message, e );
        }

    }

    /**
     * Callback for the partition . These partitions are completely
     * assembled, but each is passed separately.
     *
     * @param partition is the PDAX-style partition.
     */
    public void cbPartition(Partition partition) {
        String name = partition.getName();
        int index   = partition.getIndex();
        ArrayList sequenceList = new ArrayList(NUM_OF_EXPANDED_JOBS);
        String tailJob;
        Job job;

        //get the filename of the dax file containing the partition
        String dax = DAXWriter.getPDAXFilename(name, index);

        //construct the path to the file
        dax = mPDAXDirectory + File.separator + dax;
        File partitionDirectory;
        try{
            partitionDirectory = mFactory.createFile( getBaseName(partition)  );
            partitionDirectory.mkdirs();

            //construct a symlink to the dax file in the partition directory
            if (!createSymlink( dax, partitionDirectory)){
                mLogger.log("Unable to create symlinks of the dax file to submit dir",
                            LogManager.WARNING_MESSAGE_LEVEL);
            }
        }
        catch(IOException e){
            //wrap and throw
            throw new RuntimeException( "Unable to create partition submit directory ",
                                        e );
        }

        //construct the appropriate vds-submit-dag job with the
        //prescript set as an invocation to gencdag etc.
        job = constructDAGJob( partition , partitionDirectory, dax);


        //add to the workflow
        mMegaDAG.add(job);

        //map the partition id to the the job that is constructed.
        mJobMap.put(partition.getID(),job);

        /**
        String jobName = getPegasusJobName(name,index);
        //populate the internal job map with jobname and id
        mJobMap.put(partition.getID(),getPegasusJobName(name,index));

        //add the sub info for it
        job = constructPegasusJob(jobName, file);
        mMegaDAG.add(job);



        //generate the dagman job that ends up submitting
        //the mini dag corresponding to the partition
        //mMegaDAG.addNewJob(getJobName(name,index));
        tailJob = "condor_submit_" + jobName ;
        job = constructCondorSubmitJob(tailJob,name,index);
        mMegaDAG.add(job);

        //put the sequence list
        sequenceList.add(HEAD_INDEX,jobName);
        sequenceList.add(TAIL_INDEX,tailJob);
        mSequenceMap.put(jobName,sequenceList);

        //add the relation between jobname and tail job
        mMegaDAG.addNewRelation(jobName,tailJob);
        */

    }

    /**
     * Callback for child and parent relationships from section 3. This ties
     * in the relations between the partitions to the relations between the jobs
     * that are responsible for partitions. In addition, appropriate cache
     * file arguments are generated.
     *
     * @param child is the IDREF of the child element.
     * @param parents is a list of IDREFs of the included parents.
     */
    public void cbParents(String child, List parents) {
        String cacheName;
        String cacheArgs = null;

        //get hold of the constructed job for the child.
        //the name of the jobs are treated as ID's
        Job cJob = getJob(child);
        String  cID  = cJob.getName();

        //glue in the sequences for the expanded things together

        if(!parents.isEmpty()){
            //the megadag should not be invoked with cache option for time being
            cacheArgs = " --cache ";
        }

        //traverse through the parents to put in the relations
        //and the cache file arguments.
        String pID;
        Job pJob;
        for(Iterator it = parents.iterator();it.hasNext();){
            //get the parent job and name
            pJob = (Job)mJobMap.get(it.next());
            pID  = pJob.getName();

            mLogger.log("Adding Relation " + pID + "->" + cID,
                        LogManager.DEBUG_MESSAGE_LEVEL);
            mMegaDAG.addNewRelation(pID,cID);


            //we need to specify the cache files for those partitions
            //even if they are not constructed. there is a disconnect
            //as to how the names are being generated. There should be
            //a call to one function only.
            cacheName   = getCacheFilePath(pJob);
            cacheArgs += cacheName + ",";
        }

        //stuff the arguments back into replanner prescript.
        //should be a callout to a different function for portability
        String args = cJob.getPreScriptArguments();
        //System.out.println("Arguments are " + args);
        cJob.setPreScript( cJob.getPreScriptPath(),
                           (cacheArgs == null)?
                           //remains the same
                           args:
                           //remove the last instance of , from cache args
                           args + cacheArgs.substring(0,cacheArgs.lastIndexOf(','))
                           );

    }

    /**
     * Callback when the parsing of the document is done. This ends up
     * triggering the writing of the condor submit files corresponding to the
     * mega dag.
     */
    public void cbDone() {
        mDone = true;

        //generate the classad's options
        //for the Mega DAG
        mMegaDAG.dagInfo.generateFlowName();
        mMegaDAG.dagInfo.setFlowTimestamp( mPOptions.getDateTime( mProps.useExtendedTimeStamp() ));
        mMegaDAG.dagInfo.setDAXMTime( new File( mPOptions.getPDAX() ) );
        mMegaDAG.dagInfo.generateFlowID();
        mMegaDAG.dagInfo.setReleaseVersion();

        /* Moved to CPlanner Class Karan Apr 1 2008
        CodeGenerator codeGenerator = null;
        int state = 0;
        try{
            //load the Condor Writer that understands HashedFile Factories.
            codeGenerator = CodeGeneratorFactory.loadInstance( mBag,
                                                               CODE_GENERATOR_CLASS );
            state = 1;
            codeGenerator.generateCode( mMegaDAG );

            //generate only the braindump file that is required.
            //no spawning off the tailstatd for time being
            codeGenerator.startMonitoring();

        }
        catch( FactoryException fe ){
            throw new FactoryException("PDAX2MDAG",fe);
        }
        catch( Exception e){
            throw new RuntimeException("Error while generating code for the workflow",e);
        }
        */


    }



   /**
    * Returns the MEGADAG that is generated
    *
    * @return  ADag object containing the mega daga
    */
   public Object getConstructedObject(){
       if(!mDone)
           throw new RuntimeException("Method called before the megadag " +
                                      " was fully generated");



       return mMegaDAG;
   }


    /**
     * Constructs a job that plans and submits the partitioned workflow,
     * referred to by a Partition. The main job itself is a condor dagman job
     * that submits the concrete workflow. The concrete workflow is generated by
     * running the planner in the prescript for the job.
     *
     * @param partition   the partition corresponding to which the job has to be
     *                    constructed.
     * @param directory   the submit directory where the submit files for the
     *                    partition should reside.
     * @param dax         the absolute path to the partitioned dax file that
     *                    corresponds to this partition.
     *
     * @return the constructed DAG job.
     */
    protected Job constructDAGJob( Partition partition ,
                                       File directory,
                                       String dax){
        
    
        //for time being use the old functions.
        Job job = new Job();
        //the parent directory where the submit file for condor dagman has to
        //reside. the submit files for the corresponding partition are one level
        //deeper.
        String parentDir = directory.getParent();

        //set the logical transformation
        job.setTransformation(CONDOR_DAGMAN_NAMESPACE,
                              CONDOR_DAGMAN_LOGICAL_NAME,
                              null);

        //set the logical derivation attributes of the job.
        job.setDerivation(CONDOR_DAGMAN_NAMESPACE,
                          CONDOR_DAGMAN_LOGICAL_NAME,
                          null);

        //always runs on the submit host
        job.setSiteHandle("local");

        //set the partition id only as the unique id
        //for the time being.
//        job.setName(partition.getID());

        //set the logical id for the job same as the partition id.
        job.setLogicalID(partition.getID());

        //figure out the relative submit directory where the dagman job should
        //reside. It should be one level up from the partition directory.
        String dir = "";
        dir +=  (parentDir.equals(mSubmitDirectory))?
               //the directory is same as the root
               dir :
               //get the relative from root
               parentDir.substring(mSubmitDirectory.length() );
//        job.setSubmitDirectory(dir);

        //construct the name of the job as a deep lfn with a directory path
        StringBuffer name = new StringBuffer();

        //get the part from the first file separator onwards
        name.append( (dir.indexOf(File.separatorChar) == 0) ?
                     dir.substring(1) :
                     dir.substring(0));

        //append a file separator in the end if dir was some name
        if( dir.length() > 1) {name.append(File.separatorChar);}

        //set the basename for the deep lfn
        name.append(partition.getID());
        //System.out.println (" The name is " + name.toString());
        job.setName(name.toString());

        List entries;
        TransformationCatalogEntry entry = null;

        //get the path to condor dagman
        try{
            //try to construct the path from the environment
            entry = constructTCEntryFromEnvironment( );
            
            //try to construct from the TC
            if( entry == null ){
                entries = mTCHandle.lookup(job.namespace, job.logicalName,
                                                 job.version, job.getSiteHandle(),
                                                 TCType.INSTALLED);
                entry = (entries == null) ?
                    defaultTCEntry( "local") ://construct from site catalog
                    //Gaurang assures that if no record is found then
                    //TC Mechanism returns null
                    (TransformationCatalogEntry) entries.get(0);
            }
        }
        catch(Exception e){
            throw new RuntimeException( "ERROR: While accessing the Transformation Catalog",e);
        }
        if(entry == null){
            //throw appropriate error
            throw new RuntimeException("ERROR: Entry not found in tc for job " +
                                        job.getCompleteTCName() +
                                        " on site " + job.getSiteHandle());
        }

        //set the path to the executable and environment string
        job.executable = entry.getPhysicalTransformation();
        //the environment variable are set later automatically from the tc
        //job.envVariables = entry.envString;

        //the job itself is the main job of the super node
        //construct the classad specific information
        job.jobID = job.getName();
        job.jobClass = Job.COMPUTE_JOB;


        //directory where all the dagman related files for the nested dagman
        //reside. Same as the directory passed as an input parameter
        dir = directory.getAbsolutePath();

        //make the initial dir point to the submit file dir for the partition
        //we can do this as we are running this job both on local host, and scheduler
        //universe. Hence, no issues of shared filesystem or anything.
        job.condorVariables.construct("initialdir", dir);


        //construct the argument string, with all the dagman files
        //being generated in the partition directory. Using basenames as
        //initialdir has been specified for the job.
        StringBuffer sb = new StringBuffer();

        sb.append(" -f -l . -Debug 3").
           append(" -Lockfile ").append( getBasename( partition, ".dag.lock") ).
           append(" -Dag ").append( getBasename( partition, ".dag"));
        
        //specify condor log for condor version less than 7.1.2
        if( mCondorVersion < CondorVersion.v_7_1_2 ){
           sb.append(" -Condorlog ").append(getBasename( partition, ".log"));
        }
        
        //allow for version mismatch as after 7.1.3 condor does tight 
        //checking on dag.condor.sub file and the condor version used
        if( mCondorVersion >= CondorVersion.v_7_1_3 ){
            sb.append( " -AllowVersionMismatch " );
        }
        
        //we append the Rescue DAG option only if old version
        //of Condor is used < 7.1.0.  To detect we check for a non
        //zero value of --rescue option to pegasus-plan
        //Karan June 27, 2007
        mLogger.log( "Number of Resuce retries " + mPOptions.getNumberOfRescueTries() ,
                     LogManager.DEBUG_MESSAGE_LEVEL );
        if( mCondorVersion >= CondorVersion.v_7_1_0 || mPOptions.getNumberOfRescueTries() > 0 ){            
            mLogger.log( "Constructing arguments to dagman in 7.1.0 and later style",
                         LogManager.DEBUG_MESSAGE_LEVEL );
            sb.append( " -AutoRescue 1 -DoRescueFrom 0 ");
        }
        else{
            mLogger.log( "Constructing arguments to dagman in pre 7.1.0 style",
                         LogManager.DEBUG_MESSAGE_LEVEL );
            sb.append(" -Rescue ").append(getBasename( partition, ".dag.rescue"));
        }
        
       //pass any dagman knobs that were specified in properties file
       sb.append( this.mDAGManKnobs );

       //put in the environment variables that are required
       job.envVariables.construct("_CONDOR_DAGMAN_LOG",
                                  getAbsolutePath( partition, dir,".dag.dagman.out"));
       job.envVariables.construct("_CONDOR_MAX_DAGMAN_LOG","0");

       //set the arguments for the job
       job.setArguments(sb.toString());

       //the environment need to be propogated for exitcode to be picked up
       job.condorVariables.construct("getenv","TRUE");

       job.condorVariables.construct("remove_kill_sig","SIGUSR1");


       //the log file for condor dagman for the dagman also needs to be created
       //it is different from the log file that is shared by jobs of
       //the partition. That is referred to by Condorlog

//       keep the log file common for all jobs and dagman albeit without
//       dag.dagman.log suffix
//       job.condorVariables.construct("log", getAbsolutePath( partition, dir,".dag.dagman.log"));

//       String dagName = mMegaDAG.dagInfo.nameOfADag;
//       String dagIndex= mMegaDAG.dagInfo.index;
//       job.condorVariables.construct("log", dir + mSeparator +
//                                     dagName + "_" + dagIndex + ".log");

       
       //the job needs to be explicitly launched in 
       //scheduler universe instead of local universe
       job.condorVariables.construct( Condor.UNIVERSE_KEY, Condor.SCHEDULER_UNIVERSE );
       

       //add any notifications specified in the transformation
       //catalog for the job. JIRA PM-391
       job.addNotifications( entry );

       //incorporate profiles from the transformation catalog
       //and properties for the time being. Not from the site catalog.

       //the profile information from the transformation
       //catalog needs to be assimilated into the job
       //overriding the one from pool catalog.
       job.updateProfiles( entry );

       //the profile information from the properties file
       //is assimilated overidding the one from transformation
       //catalog.
       job.updateProfiles(mProps);

       //constructed the main job. now construct the prescript
       //the log file resides in the directory where the condor_dagman
       //job resides i.e the parent directory.
       StringBuffer log = new StringBuffer();
       log.append(parentDir).append(mSeparator).append(partition.getID()).
       append(".pre.log");
       //set the prescript for the job in the dagman namespace
       setPrescript( job, dax, log.toString());

       //construct the braindump file for tailstatd invocations
       //the dag should be same as the one passed in the arguments string!
       StringBuffer dag = new StringBuffer();
       dag.append(dir).append(mSeparator).append( getBasename( partition, ".dag"));


       //we do not want the job to be launched via kickstart
       //Fix for Pegasus bug number 143
       //http://bugzilla.globus.org/vds/show_bug.cgi?id=143
       job.vdsNS.construct( Pegasus.GRIDSTART_KEY,
                            GridStartFactory.GRIDSTART_SHORT_NAMES[GridStartFactory.NO_GRIDSTART_INDEX] );

       return job;
    }


    /**
     * Returns a default TC entry to be used in case entry is not found in the
     * transformation catalog.
     *
     * @param site   the site for which the default entry is required.
     *
     *
     * @return  the default entry.
     */
    private  TransformationCatalogEntry defaultTCEntry( String site ){
        //not implemented as we dont have handle to site catalog in this class
        return null;
    }

    /**
     * Returns a tranformation catalog entry object constructed from the environment
     * 
     * An entry is constructed if either of the following environment variables
     * are defined
     * 1) CONDOR_HOME
     * 2) CONDOR_LOCATION
     * 
     * CONDOR_HOME takes precedence over CONDOR_LOCATION
     *
     *
     * @return  the constructed entry else null.
     */
    private  TransformationCatalogEntry constructTCEntryFromEnvironment( ){
        //construct environment profiles 
        Map<String,String> m = System.getenv();
        ENV env = new ENV();
        String key = "CONDOR_HOME";
        if( m.containsKey( key ) ){
            env.construct( key, m.get( key ) );
        }
        
        key = "CONDOR_LOCATION";
        if( m.containsKey( key ) ){
            env.construct( key, m.get( key ) );
        }
        
        return constructTCEntryFromEnvProfiles( env );
    }
    
    /**
     * Returns a tranformation catalog entry object constructed from the environment
     * 
     * An entry is constructed if either of the following environment variables
     * are defined
     * 1) CONDOR_HOME
     * 2) CONDOR_LOCATION
     * 
     * CONDOR_HOME takes precedence over CONDOR_LOCATION
     * 
     * @param env  the environment profiles.
     *
     *
     * @return  the entry constructed else null if environment variables not defined.
     */
    private TransformationCatalogEntry constructTCEntryFromEnvProfiles( ENV env ) {
        TransformationCatalogEntry entry = null;
        
        //check if either CONDOR_HOME or CONDOR_LOCATION is defined
        String key = null;
        if( env.containsKey( "CONDOR_HOME") ){
            key = "CONDOR_HOME";
        }
        else if( env.containsKey( "CONDOR_LOCATION") ){
            key = "CONDOR_LOCATION";
        }
        
        if( key == null ){
            //environment variables are not defined.
            return entry;
        }
        
        mLogger.log( "Constructing path to dagman on basis of env variable " + key,
                     LogManager.DEBUG_MESSAGE_LEVEL );
        entry = new TransformationCatalogEntry();
        entry.setLogicalTransformation( CONDOR_DAGMAN_NAMESPACE,
                                        CONDOR_DAGMAN_LOGICAL_NAME,
                                        null );
        entry.setType( TCType.INSTALLED );
        entry.setResourceId( "local" );
        
        //construct path to condor dagman
        StringBuffer path = new StringBuffer();
        path.append( env.get( key ) ).append( File.separator ).
             append( "bin" ).append( File.separator).
             append( "condor_dagman" );
        entry.setPhysicalTransformation( path.toString() );
        
        return entry;
    }


    /**
     * Writes out the braindump.txt file for a partition in the partition submit
     * directory. The braindump.txt file is used for passing to the tailstatd
     * daemon that monitors the state of execution of the workflow.
     *
     * @param directory   the directory in which the braindump file needs to
     *                    be written to.
     * @param partition   the partition for which the braindump is to be written out.
     * @param dax         the dax file
     * @param dag         the dag file
     *
     * @return the absolute path to the braindump file.txt written in the directory.
     *
     * @throws IOException in case of error while writing out file.
     */
    protected String writeOutBraindump( File directory,
                                        Partition partition,
                                        String dax,
                                        String dag)
                                        throws IOException{

        //sanity check on the directory
        sanityCheck( directory );

        //create a writer to the braindump.txt in the directory.
        File f = new File( directory , "braindump.txt");
        PrintWriter writer =
                  new PrintWriter(new BufferedWriter(new FileWriter(f)));

        //store absolute path to dir just once
        String absPath = directory.getAbsolutePath();

        //assemble all the contents in a buffer before writing out
        StringBuffer contents = new StringBuffer();
        contents.append("dax ").append(dax).append("\n").
                 append("dag ").append(dag).append("\n").
                 append("run ").append(absPath).append("\n").
                 append("jsd ").append(absPath).append(mSeparator).append("jobstate.log").append("\n").
                 append("rundir ").append(directory.getName()).append("\n").
                 append("bindir ").append(mProps.getBinDir()).append("\n").
                 append("vogroup pegasus").append("\n").//for time being
                 append("label " + partition.getName());

        writer.write( contents.toString());
        writer.close();

        return f.getAbsolutePath();
    }



    /**
     * Writes out the properties to a temporary file in the directory passed.
     *
     * @param directory   the directory in which the properties file needs to
     *                    be written to.
     *
     * @return the absolute path to the properties file written in the directory.
     *
     * @throws IOException in case of error while writing out file.
     */
    protected String writeOutProperties( String directory ) throws IOException{
        File dir = new File(directory);

        //sanity check on the directory
        sanityCheck( dir );

        //we only want to write out the Pegasus properties for time being
        Properties properties = mProps.matchingSubset( "pegasus", true );

        //create a temporary file in directory
        File f = File.createTempFile( "pegasus.", ".properties", dir );

        //the header of the file
        StringBuffer header = new StringBuffer(64);
        header.append("PEGASUS USER PROPERTIES AT RUNTIME \n")
              .append("#ESCAPES IN VALUES ARE INTRODUCED");

        //create an output stream to this file and write out the properties
        OutputStream os = new FileOutputStream(f);
        properties.store( os, header.toString() );
        os.close();

        return f.getAbsolutePath();
    }

    /**
     * Sets the prescript that ends up calling to the default wrapper that
     * introduces retry into Pegasus for a particular job.
     *
     * @param job      the job whose prescript needs to be set.
     * @param daxURL   the path to the dax file on the filesystem.
     * @param log      the file where the output of the prescript needs to be
     *                 redirected to.
     *
     * @see #RETRY_LOGICAL_NAME
     */
    protected void setPrescript(Job job, String daxURL, String log){
        setPrescript( job,
                      daxURL,
                      log,
                      this.NAMESPACE,
                      RETRY_LOGICAL_NAME,
                      null);
    }


    /**
     * Sets the prescript that ends up calling to the default wrapper that
     * introduces retry into Pegasus for a particular job.
     *
     * @param job       the job whose prescript needs to be set.
     * @param daxURL    the path to the dax file on the filesystem.
     * @param log       the file where the output of the prescript needs to be
     *                  redirected to.
     * @param namespace the namespace of the replanner utility.
     * @param name      the logical name of the replanner.
     * @param version   the version of the replanner to be picked up.
     *
     */
     protected void setPrescript(Job job,
                                  String daxURL,
                                  String log,
                                  String namespace,
                                  String name,
                                  String version){
        String site = job.getSiteHandle();
        TransformationCatalogEntry entry = null;

        //get the path to script wrapper from the
        try{
            List entries = mTCHandle.lookup(namespace,
                                                  name,
                                                  version,
                                                  site,
                                                  TCType.INSTALLED);

            //get the first entry from the list returned
            entry = ( entries == null ) ?
                     null :
                     //Gaurang assures that if no record is found then
                     //TC Mechanism returns null
                     ((TransformationCatalogEntry) entries.get(0));
        }
        catch(Exception e){
            throw new RuntimeException( "ERROR: While accessing the Transformation Catalog",e);
        }

        PlannerOptions options =  ( mPOptions == null)? null : (PlannerOptions)mPOptions.clone();
        if( options == null ){
            throw new RuntimeException( "ERROR: Planner Options passed to setPrescript are null" );
        }


        //construct the prescript path
        StringBuffer script = new StringBuffer();
        if(entry == null){
            //log to debug
            mLogger.log("Constructing the default path to the replanner for prescript",
                        LogManager.DEBUG_MESSAGE_LEVEL);

            //construct the default path to the executable
            script.append( mProps.getBinDir() ).append( mSeparator ).
                   append( RETRY_LOGICAL_NAME );
        }
        else{
            script.append(entry.getPhysicalTransformation());
        }

        //the output of the prescript i.e submit files should be created
        //in the directory where the job is being run.
        options.setSubmitDirectory( (String)job.condorVariables.get("initialdir"));


        //generate the remote working directory for the paritition
        String submit        = options.getSubmitDirectory(); // like /tmp/vahi/pegasus/blackdiamond/run0001/00/PID1
        String remoteBase    = mPOptions.getRandomDir(); // like vahi/pegasus/blackdiamond/run0001
        String remoteWorkDir = submit.substring( submit.indexOf( remoteBase) ); //gets us vahi/pegasus/blackdiamond/run0001/00/PID1


//trying to use the relative dir option now, Karan April 10, 2008
//        options.setRandomDir( remoteWorkDir );
//        mLogger.log( "Remote working directory set to " + remoteWorkDir +
//                     " for partition " + job.getID() ,
//                     LogManager.DEBUG_MESSAGE_LEVEL );

        //set the base and relative submit dir
        options.setBaseSubmitDirectory( mPOptions.getBaseSubmitDirectory() );
        options.setRelativeDirectory( remoteWorkDir );

        //set the basename for the nested dag as the ID of the job.
        //which is actually the basename of the deep lfn job name!!
        options.setBasenamePrefix( getBasenamePrefix(job));

        //set the flag designating that the planning invocation is part
        //of a deferred planning run
        options.setPartOfDeferredRun( true );

        //in case of deferred planning cleanup wont work
        //explicitly turn it off if the file cleanup scope if fullahead
        if( mCleanupScope.equals( PegasusProperties.CLEANUP_SCOPE.fullahead ) ){
            options.setCleanup( false );
        }

        //we want monitoring to happen
        options.setMonitoring( true );

        //construct the argument string.
        //add the jvm options and the pegasus options if any
        StringBuffer arguments = new StringBuffer();
        arguments./*append( mPOptions.toJVMOptions())*/
                  append( " -Dpegasus.log.*=").append(log).
                  //add other jvm options that user may have specified
                  append( options.toJVMOptions() ).
                  append(" --conf ").append( mMDAGPropertiesFile ).
                  //the dax argument is diff for each partition
                  append(" --dax ").append( daxURL ).
                  //put in all the other options.
                  append( options.toOptions());

        //set the path and the arguments to prescript
        job.setPreScript( script.toString(), arguments.toString());
    }



    /**
     * Returns the base name of the submit directory in which the submit files
     * for a particular partition reside.
     *
     * @param partition  the partition for which the base directory is to be
     *                   constructed.
     *
     * @return the base name of the partition.
     */
    protected String getBaseName( Partition partition ){
        String id = partition.getID();
        StringBuffer sb = new StringBuffer( id.length() + 1 );
        sb.append('P').append(id);
        return sb.toString();
    }

    /**
     * Returns the absolute path to a dagman (usually) related file for a
     * particular partition in the submit directory that is passed as an input
     * parameter. This does not create the file, just returns an absolute path
     * to it. Useful for constructing argument string for condor_dagman.
     *
     * @param partition  the partition for which the dagman is responsible for
     *                   execution.
     * @param directory  the directory where the file should reside.
     * @param suffix     the suffix for the file basename.
     *
     * @return  the absolute path to a file in the submit directory.
     */
    protected String getAbsolutePath( Partition partition,
                                      String directory,
                                      String suffix){

        StringBuffer sb = new StringBuffer();
        //add a prefix P to partition id
        sb.append( directory ).append(mSeparator).
           append( getBasename( partition, suffix) );

        return sb.toString();
    }

    /**
     * Returns the basename of a dagman (usually) related file for a particular
     * partition.
     *
     * @param partition  the partition for which the dagman is responsible for
     *                   execution.
     * @param suffix     the suffix for the file basename.
     *
     * @return the basename.
     */
    protected String getBasename( Partition partition, String suffix ){
        StringBuffer sb = new StringBuffer( 16 );
        //add a prefix P
        sb.append('P').append(partition.getID()).append( suffix );
        return sb.toString();
    }


    /**
     * Returns the basename prefix of a dagman (usually) related file for a
     * a job that submits nested dagman.
     *
     * @param job    the job that submits a nested dagman.
     *
     * @return the basename.
     */
    protected String getBasenamePrefix( Job job ){
        StringBuffer sb = new StringBuffer( 8 );
        //add a prefix P
        sb.append('P').append(job.getLogicalID());
        return sb.toString();
    }

    /**
     * Returns the full path to a cache file that corresponds for one partition.
     * The cache file resides in the submit directory for the partition for which
     * the job is responsible for.
     *
     * @param job  the job running on the submit host that submits the partition.
     *
     * @return the full path to the file.
     */
    protected String getCacheFilePath(Job job){
        StringBuffer sb = new StringBuffer();

        //cache file is being generated in the initialdir set for the job.
        //intialdir is set correctly to the submit directory for nested dag.
        sb.append(job.condorVariables.get("initialdir")).
           append(File.separatorChar).append(getBasenamePrefix(job)).
           append(".cache");

        return sb.toString();
    }


    /**
     * Returns the number of partitions referred to in the PDAX file.
     *
     * @param source  the source file that has to be symlinked.
     * @param destDir the destination directory where the symlink has to be
     *                placed.
     *
     * @return the number of partitions in the pdax file.
     */
    protected boolean createSymlink( String source, File destDir ){
        boolean result = false;

        //do some sanity checks on the source and the destination
        File f = new File( source );
        if( !f.exists() || !f.canRead()){
            mLogger.log("The source for symlink does not exist " + source,
                        LogManager.ERROR_MESSAGE_LEVEL);
            return result;
        }
        if( !destDir.exists() || !destDir.isDirectory() || !destDir.canWrite()){
            mLogger.log("The destination directory cannot be written to " + destDir,
                        LogManager.ERROR_MESSAGE_LEVEL);
            return result;
        }

        try{
            //set the callback and run the grep command
            Runtime r = Runtime.getRuntime();
            String command = "ln -s " + source + " " + destDir.getAbsolutePath();
            mLogger.log("Creating symlink " + command,
                        LogManager.DEBUG_MESSAGE_LEVEL);
            Process p = r.exec(command);

            //spawn off the gobblers with the already initialized default callback
            StreamGobbler ips =
                new StreamGobbler(p.getInputStream(), mDefaultCallback);
            StreamGobbler eps =
                new StreamGobbler(p.getErrorStream(), mDefaultCallback);

            ips.start();
            eps.start();

            //wait for the threads to finish off
            ips.join();
            eps.join();

            //get the status
            int status = p.waitFor();
            if( status != 0){
                mLogger.log("Command " + command + " exited with status " + status,
                            LogManager.DEBUG_MESSAGE_LEVEL);
                return result;
            }
            result = true;
        }
        catch(IOException ioe){
            mLogger.log("IOException while creating symbolic links ", ioe,
                        LogManager.ERROR_MESSAGE_LEVEL);
        }
        catch( InterruptedException ie){
            //ignore
        }
        return result;
    }


    /**
     * Returns the number of partitions referred to in the PDAX file.
     *
     * @param pdax  the path to the pdax file.
     *
     * @return the number of partitions in the pdax file.
     */
    protected int getPartitionCount( String pdax ){
        int result = 0;
        File f = new File( pdax );
        if( !f.exists() || !f.canRead()){
            throw new RuntimeException( "PDAX File is unreadable " + pdax);
        }

        try{
            //set the callback and run the grep command
            String word = "<partition";
            GrepCallback c = new GrepCallback(word);
            Runtime r = Runtime.getRuntime();
            String env[] = {"PATH=/bin:/usr/bin"};
            String command = "grep " + word + " " + pdax;
            Process p = r.exec(command, env);

            //spawn off the gobblers
            StreamGobbler ips = new StreamGobbler(p.getInputStream(), c);
            StreamGobbler eps = new StreamGobbler(p.getErrorStream(),
                                                  new StreamGobblerCallback(){
                                                      //we cannot log to any of the default stream
                                                      LogManager mLogger =  LogManagerFactory.loadSingletonInstance();

                                                      public void work(String s){
                                                          mLogger.log("Output on stream gobller error stream " +
                                                                      s,LogManager.DEBUG_MESSAGE_LEVEL);
                                                      }
                                                     });
            ips.start();
            eps.start();

            //wait for the threads to finish off
            ips.join();
            result = c.getCount();
            eps.join();

            //get the status
            int status = p.waitFor();
            if( status != 0){
                mLogger.log("Command " + command + " exited with status " + status,
                            LogManager.WARNING_MESSAGE_LEVEL);
            }

        }
        catch(IOException ioe){
            mLogger.log("IOException while determining partition count ", ioe,
                        LogManager.ERROR_MESSAGE_LEVEL);
        }
        catch( InterruptedException ie){
            //ignore
        }
        return result;
    }

    /**
     * Returns the job that has been constructed for a particular partition.
     *
     * @param id  the partition id.
     *
     * @return the corresponding job, else null if not found.
     */
    protected Job getJob(String id){
        Object obj = mJobMap.get(id);
        return (obj == null)?null:(Job)obj;
    }

    /**
     * Creates the submit directory for the workflow. This is not thread safe.
     *
     * @param label   the label of the workflow being worked upon.
     * @param dir     the base directory specified by the user.
     * @param user    the username of the user.
     * @param vogroup the vogroup to which the user belongs to.
     * @param timestampBased boolean indicating whether to have a timestamp based dir or not
     *
     * @return  the directory name created relative to the base directory passed
     *          as input.
     *
     * @throws IOException in case of unable to create submit directory.
     */
    protected String createSubmitDirectory( String label,
                                            String dir,
                                            String user,
                                            String vogroup,
                                            boolean timestampBased ) throws IOException {
        File base = new File( dir );
        StringBuffer result = new StringBuffer();

        //do a sanity check on the base
        sanityCheck( base );

        //add the user name if possible
        base = new File( base, user );
        result.append( user ).append( File.separator );

        //add the vogroup
        base = new File( base, vogroup );
        sanityCheck( base );
        result.append( vogroup ).append( File.separator );

        //add the label of the DAX
        base = new File( base, label );
        sanityCheck( base );
        result.append( label ).append( File.separator );

        //create the directory name
        StringBuffer leaf = new StringBuffer();
        if( timestampBased ){
            leaf.append( mPOptions.getDateTime( mProps.useExtendedTimeStamp() ) );
        }
        else{
            //get all the files in this directory
            String[] files = base.list( new RunDirectoryFilenameFilter() );
            //find the maximum run directory
            int num, max = 1;
            for( int i = 0; i < files.length ; i++ ){
                num = Integer.parseInt( files[i].substring( SUBMIT_DIRECTORY_PREFIX.length() ) );
                if ( num + 1 > max ){ max = num + 1; }
            }

            //create the directory name
            leaf.append( SUBMIT_DIRECTORY_PREFIX ).append( mNumFormatter.format( max ) );
        }
        result.append( leaf.toString() );
        base = new File( base, leaf.toString() );
        mLogger.log( "Directory to be created is " + base.getAbsolutePath(),
                     LogManager.DEBUG_MESSAGE_LEVEL );
        sanityCheck( base );

        return result.toString();
    }

    /**
     * Constructs Any extra arguments that need to be passed to dagman, as determined
     * from the properties file.
     *
     * @param properties   the <code>PegasusProperties</code>
     *
     * @return any arguments to be added, else empty string
     */
    public static String constructDAGManKnobs( PegasusProperties properties ){
        StringBuffer sb = new StringBuffer();

        //get all the values for the dagman knows
        int value;
        for( int i = 0; i < PDAX2MDAG.DAGMAN_KNOBS.length; i++ ){
            value = parseInt( properties.getProperty( PDAX2MDAG.DAGMAN_KNOBS[i][0] ) );
            if ( value > 0 ){
                //add the option
                sb.append( PDAX2MDAG.DAGMAN_KNOBS[i][1] );
                sb.append( value );
            }
        }
        return sb.toString();

    }

    /**
     * Parses a string into an integer. Non valid values returned as -1
     *
     * @param s  the String to be parsed as integer
     *
     * @return the int value if valid, else -1
     */
    protected static int parseInt( String s ){
        int value = -1;
        try{
            value = Integer.parseInt( s );
        }
        catch( Exception e ){
            //ignore
        }
        return value;
    }

    /**
     * A small utility method that constructs the name of the Condor files
     * that are generated when a dag is submitted. The default separator _ is
     * used.
     *
     * @param name  the name attribute in the partition element of the pdax.
     * @param index the partition number of the partition.
     * @param suffix the suffix that needs to be added to the filename.
     *
     * @return the name of the condor file.
     */
    private String getCondorFileName(String name, int index, String suffix){
        return getCondorFileName(name,index,suffix,"_");
    }


    /**
     * A small utility method that constructs the name of the Condor files
     * that are generated when a dag is submitted.
     *
     * @param name  the name attribute in the partition element of the pdax.
     * @param index the partition number of the partition.
     * @param suffix the suffix that needs to be added to the filename
     * @param separator the separator that is to be used while constructing
     *                  the filename.
     *
     * @return the name of the condor file
     */
    private String getCondorFileName(String name, int index, String suffix,
                                     String separator){
        StringBuffer sb = new StringBuffer();
        //all the files reside in the submit file
        //directory specified by the user.
        //sb.append(mPOptions.submitFileDir).append(File.separator);
        sb.append(name).append(separator).append(index).append(suffix);

        return sb.toString();
    }


    /**
     * An inner class, that implements the StreamGobblerCallback to count
     * the occurences of a word in a document.
     *
     */
    private class GrepCallback implements StreamGobblerCallback{

        /**
         * The word that is to be searched for.
         */
        private String mWord;

        /**
         * The length of the word to be searched for.
         */
        private int mWordLength;

        /**
         * The number of times the word appears.
         */
        private int mCount;

        /**
         * Overloaded Constructor.
         *
         * @param word  the word to be searched for.
         */
        public GrepCallback( String word ){
            mWord = word;
            mWordLength = (word == null) ? 0 : word.length();
            mCount = 0;
        }

        /**
         * Callback whenever a line is read from the stream by the StreamGobbler.
         * Counts the occurences of the word that are in the line, and
         * increments to the global counter.
         *
         * @param line   the line that is read.
         */
        public void work( String line ){

            //sanity check to prevent infinite iterations
            if( mWordLength == 0 ) return;

            int start = 0;
            int index;
            while ( ( index = line.indexOf( mWord, start)) != -1){
                mCount++;
                start = index + mWordLength;
            }
        }

        /**
         * Returns the number of words counted so far.
         *
         * @return the number of words
         */
        public int getCount(){
            return mCount;
        }

        /**
         * Resets the internal counters.
         */
        public void reset(){
            mCount = 0;
        }

    }
}
/**
 * A filename filter for identifying the run directory
 *
 * @author Karan Vahi vahi@isi.edu
 */
class RunDirectoryFilenameFilter implements FilenameFilter {

    /**
     * Store the regular expressions necessary to parse kickstart output files
     */
    private static final String mRegexExpression =
                                     "(" + PDAX2MDAG.SUBMIT_DIRECTORY_PREFIX + ")([0-9][0-9][0-9][0-9])";

    /**
     * Stores compiled patterns at first use, quasi-Singleton.
     */
    private static Pattern mPattern = null;



    /***
     * Tests if a specified file should be included in a file list.
     *
     * @param dir the directory in which the file was found.
     * @param name - the name of the file.
     *
     * @return  true if and only if the name should be included in the file list
     *          false otherwise.
     *
     *
     */
     public boolean accept( File dir, String name) {
         //compile the pattern only once.
         if( mPattern == null ){
             mPattern = Pattern.compile( mRegexExpression );
         }
         return mPattern.matcher( name ).matches();
     }


}
